// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-05-10 18:42:48

package client

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"time"

	"jianmu-worker-kube/engine"
)

const (
	apiJoin    = "/workers/%s/join"
	apiPing    = "/workers/%s/ping"
	apiRequest = "/workers/kubernetes/%s/tasks"
	apiAccept  = "/workers/%s/tasks/%s/accept"
	apiFind    = "/workers/kubernetes/%s/tasks/%s"
	apiUpdate  = "/workers/%s/tasks/%s"
	apiBatch   = "/workers/%s/tasks/%s/logs/batch"
	apiUpload  = "/workers/%s/tasks/%s/logs"
)

// HTTPClient Worker API
type HTTPClient struct {
	Client     *http.Client
	Endpoint   string
	Secret     string
	WorkerId   string
	SkipVerify bool
}

// New returns a new worker client.
func New(endpoint, secret string, skipverify bool, workerId string) *HTTPClient {
	client := &HTTPClient{
		Endpoint:   endpoint,
		Secret:     secret,
		WorkerId:   workerId,
		SkipVerify: skipverify,
	}
	if skipverify {
		client.Client = &http.Client{
			CheckRedirect: func(*http.Request, []*http.Request) error {
				return http.ErrUseLastResponse
			},
			Transport: &http.Transport{
				Proxy: http.ProxyFromEnvironment,
				TLSClientConfig: &tls.Config{
					InsecureSkipVerify: true,
				},
			},
		}
	}
	return client
}

var _ Client = (*HTTPClient)(nil)

// defaultClient is the default http.Client.
var defaultClient = &http.Client{
	CheckRedirect: func(*http.Request, []*http.Request) error {
		return http.ErrUseLastResponse
	},
}

// Ping sends a ping message to the server to test connectivity.
func (p *HTTPClient) Ping(ctx context.Context) error {
	uri := fmt.Sprintf(apiPing, p.WorkerId)
	_, err := p.do(ctx, uri, "GET", nil, nil)
	return err
}

func (p *HTTPClient) Join(ctx context.Context, worker *engine.Worker) error {
	uri := fmt.Sprintf(apiJoin, p.WorkerId)
	src := worker
	_, err := p.do(ctx, uri, "PUT", src, nil)
	return err
}

// Request 请求新任务执行 .
func (p *HTTPClient) Request(ctx context.Context, args *Filter) (*engine.Unit, error) {
	uri := fmt.Sprintf(apiRequest, p.WorkerId)
	// src := args
	dst := new(engine.Unit)
	_, err := p.retry(ctx, uri, "GET", nil, dst)
	return dst, err
}

// Accept 确认任务执行，乐观锁
func (p *HTTPClient) Accept(ctx context.Context, runner *engine.Runner) error {
	uri := fmt.Sprintf(apiAccept, p.WorkerId, runner.ID)
	src := runner
	dst := new(engine.Runner)
	_, err := p.retry(ctx, uri, "PATCH", src, dst)
	src.Version = dst.Version
	return err
}

// FindById 根据任务ID查找任务
func (p *HTTPClient) FindById(ctx context.Context, id string) (*engine.Unit, error) {
	uri := fmt.Sprintf(apiFind, p.WorkerId, id)
	dst := new(engine.Unit)
	_, err := p.retry(ctx, uri, "GET", nil, dst)
	return dst, err
}

// Batch 实时上传日志.
func (p *HTTPClient) Batch(ctx context.Context, taskId string, lines []*engine.Line) error {
	uri := fmt.Sprintf(apiBatch, p.WorkerId, taskId)
	_, err := p.do(ctx, uri, "POST", &lines, nil)
	return err
}

// Update 更新任务状态
func (p *HTTPClient) Update(ctx context.Context, task *engine.Task) error {
	uri := fmt.Sprintf(apiUpdate, p.WorkerId, task.TaskId)
	_, err := p.retry(ctx, uri, "PATCH", task, nil)
	return err
}

// Watch 监控任务是否取消
func (p *HTTPClient) Watch(ctx context.Context, taskId string) (bool, error) {
	uri := fmt.Sprintf(apiUpdate, p.WorkerId, taskId)
	res, err := p.retry(ctx, uri, "POST", nil, nil)
	if err != nil {
		return false, err
	}
	if res.StatusCode == 200 {
		return true, nil
	}
	return false, nil
}

// Upload 上传完整日志
func (p *HTTPClient) Upload(ctx context.Context, taskId string, lines []*engine.Line) error {
	uri := fmt.Sprintf(apiUpload, p.WorkerId, taskId)
	_, err := p.retry(ctx, uri, "POST", &lines, nil)
	return err
}

func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) {
	for {
		res, err := p.do(ctx, path, method, in, out)
		// do not retry on Canceled or DeadlineExceeded
		if err := ctx.Err(); err != nil {
			return res, err
		}
		// do not retry on optimisitic lock errors
		if err == ErrOptimisticLock {
			return res, err
		}
		if res != nil {
			// Check the response code. We retry on 500-range
			// responses to allow the server time to recover, as
			// 500's are typically not permanent errors and may
			// relate to outages on the server side.
			if res.StatusCode > 501 {
				time.Sleep(time.Second * 10)
				continue
			}
			// We also retry on 204 no content response codes,
			// used by the server when a long-polling request
			// is intentionally disconnected and should be
			// automatically reconnected.
			if res.StatusCode == 204 {
				time.Sleep(time.Second * 1)
				continue
			}
		} else if err != nil {
			time.Sleep(time.Second * 10)
			continue
		}

		return res, err
	}
}

// do is a helper function that posts a signed http request with
// the input encoded and response decoded from json.
func (p *HTTPClient) do(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) {
	var buf bytes.Buffer

	// marshal the input payload into json format and copy
	// to an io.ReadCloser.
	if in != nil {
		json.NewEncoder(&buf).Encode(in)
	}

	endpoint := p.Endpoint + path
	req, err := http.NewRequest(method, endpoint, &buf)
	if err != nil {
		return nil, err
	}
	req = req.WithContext(ctx)

	req.Header.Set("Content-Type", "application/json; charset=utf-8")
	// the request should include the secret shared between
	// the agent and server for authorization.
	req.Header.Add("X-Jianmu-Token", p.Secret)

	res, err := p.client().Do(req)
	if res != nil {
		defer func() {
			// drain the response body so we can reuse
			// this connection.
			io.Copy(ioutil.Discard, io.LimitReader(res.Body, 4096))
			res.Body.Close()
		}()
	}
	if err != nil {
		return res, err
	}

	// if the response body return no content we exit
	// immediately. We do not read or unmarshal the response
	// and we do not return an error.
	if res.StatusCode == 204 {
		return res, nil
	}

	// Check the response for a 409 conflict. This indicates an
	// optimistic lock error, in which case multiple clients may
	// be attempting to update the same record. Convert this error
	// code to a proper error.
	if res.StatusCode == 409 {
		return nil, ErrOptimisticLock
	}

	// else read the response body into a byte slice.
	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return res, err
	}

	if res.StatusCode > 299 {
		// if the response body includes an error message
		// we should return the error string.
		if len(body) != 0 {
			return res, errors.New(
				string(body),
			)
		}
		// if the response body is empty we should return
		// the default status code text.
		return res, errors.New(
			http.StatusText(res.StatusCode),
		)
	}
	if out == nil {
		return res, nil
	}

	return res, json.Unmarshal(body, out)
}

// client is a helper funciton that returns the default client
// if a custom client is not defined.
func (p *HTTPClient) client() *http.Client {
	if p.Client == nil {
		return defaultClient
	}
	return p.Client
}
